package com.bfxy.rocketmq.producer.transaction;

import com.bfxy.rocketmq.constants.Const;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;

/**
 * 生产端代码
 */
public class TransactionProducer {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, InterruptedException {
        //新建producer，名称为text_tx_producer_group_name
        TransactionMQProducer producer = new TransactionMQProducer("text_tx_producer_group_name");
        //对于TransactionMQProducer要有一个线程池
        ExecutorService executorService = new ThreadPoolExecutor(2, 5,
                100, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(2000), //有界队列
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        //每个线程可以给它取个名字，以后在日志的处理啊异常的处理啊有必要的。
                        Thread thread = new Thread(r);
                        thread.setName("text_tx_producer_group_name" + "check-thread");
                        return thread;
                    }
                });

        //设置producer的NameServer:Const.NAMESERVER_ADDRESS
        //TODO
        producer.setNamesrvAddr(Const.NAMESRV_ADDR_SINGLE);
        producer.setExecutorService(executorService);

        //实际的业务处理：这个对象主要做两件事情：1、异步的执行本地事物；2、做回查
        TransactionListener transactionListener = new TransactionListenerImpl();

        //回调的函数，也是执行并行操作的函数
        producer.setTransactionListener(transactionListener);
        producer.start();//抛异常

        //发事物消息
        Message message = new Message("test_tx_topic", "TagA", "K",
                ("hello rocketmq 4 tx!").getBytes(RemotingHelper.DEFAULT_CHARSET));// 字符集：RemotingHelper.DEFAULT_CHARSET。//抛异常
        //发消息
        producer.sendMessageInTransaction(message, "我是回调的参数");
        Thread.sleep(Integer.MAX_VALUE);//抛异常
        //用完之后要记得关闭
        producer.shutdown();

    }

}
